package io.gitee.thant.utils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;
import java.util.zip.CRC32;

/**
 * 功能描述: socket通讯包装类(目前仅UDP)
 * @author xiaowf
 * @version 2017年11月22日
 * @since JDK 1.7
 */
public class PPTalk {
	private static final byte[] MAGICPREFIX = "@PT#".getBytes();		//数据包特殊标识符
	private static final int PACKVER = 1;								//数据包版本号
	private static final int MAXUDPPACKAGESIZE = 65507;				//默认数据包大小
	private static final String SYSCMD_FEEDBACK = "<*-FEEDBACK-*>";	//系统数据包:收到反馈命令
	private static final int RETRYTIMES = 3;							//发送失败的重试次数
	private Core _core;
	private static final Map<String, FeedInfo> _feed = new ConcurrentHashMap<String, FeedInfo>();
	
	/**
	 * 功能描述:反馈包
	 * @author xiaowf
	 * @version 2017年11月22日
	 * @since JDK 1.7
	 */
	class FeedInfo {
		public boolean ok;
		public String msg;
		
		FeedInfo(boolean ok, String msg) {
			this.ok = ok;
			this.msg = msg;
		}
	}
	
	/**
	 * 功能描述: 发送数据包
	 * @author xiaowf
	 * @version 2017年11月22日
	 * @since JDK 1.7
	 */
	class SendPack {
		public SocketAddress addr;
		public ByteBuffer data;
		
		public SendPack(SocketAddress address, ByteBuffer buf) {
			addr = address;
			data = buf;
		}
	}

	public class RecvPack {
		private String packid;
		private String cmd;
		private InetSocketAddress addr;
		private boolean feedback;
		private byte[] data;
		
		public String getId() {
			return packid;
		}

		public String getCmd() {
			return cmd;
		}

		public byte[] getData() {
			return data;
		}

		public InetSocketAddress getAddress() {
			return addr;
		}

		public boolean isFeedback() {
			return feedback;
		}
		
		public RecvPack(String id, String cmd, InetSocketAddress addr, byte[] dat, boolean fb) {
			this.packid = id;
			this.cmd = cmd;
			this.addr = addr;
			this.data = dat;
			this.feedback = fb;
		}
	}

	/**
	 * 功能描述: 后台收发线程类
	 * @author xiaowf
	 * @version 2017年11月22日
	 * @since JDK 1.7
	 */
	class Core {
		private int _localport;
		private boolean _loop = true;
		private Function<RecvPack, Object> _recvobj;
		public DatagramChannel _channel = null;
		private Thread[] threadA = {null, null};
		private Queue<SendPack> _sendLst = new ConcurrentLinkedQueue<SendPack>();
		private Queue<RecvPack> _recvLst = new ConcurrentLinkedQueue<RecvPack>();
		
		Core(int port, Function<RecvPack, Object> recvobj) {
			_localport = port;
			_recvobj = recvobj;
			try {
				_channel = DatagramChannel.open();
				_channel.configureBlocking(false);
				_channel.socket().bind(new InetSocketAddress(_localport));
			} catch (Exception e) {
				e.printStackTrace();
			}
			
			threadA[0] = new Thread(){
				public void run() {
					socketThread();
				}
			};
			
			threadA[1] = new Thread(){
				public void run() {
					bussThread();
				}
			};
		}
		
		/**
		 * 功能描述:发送数据（内部函数，非API）
		 * <pre></pre> 
		 * @param type    发送类型
		 * @param addrstr 发送地址
		 * @param port    发送端口
		 * @param content 数据
		 * @param inf     反馈包
		 */
		public void send(String id, String type, InetSocketAddress addr, byte[] content, FeedInfo inf) {
			ByteBuffer pack = doPack(id, type, content, inf == null ? 0 : 1);
			SendPack task = new SendPack(addr, pack);
			_sendLst.add(task);
		}
		
		/**
		 * 功能描述:接收处理过程
		 * <pre></pre> 
		 * @param key          当前选中的套接字
		 * @param recvBuf      数据接收缓冲区
		 * @throws IOException socket异常
		 */
		private void doRecv(SelectionKey key, ByteBuffer recvBuf) throws IOException {
			DatagramChannel dc = (DatagramChannel)key.channel();
			InetSocketAddress client = (InetSocketAddress)dc.receive(recvBuf);
			//String clientaddr = client.getAddress().getHostAddress();
			//int clientport = client.getPort();
			
			recvBuf.flip();
			Map<String, Object> head = readHead(recvBuf);
			if (head == null) {
				System.out.println("无效包.");
				return;
			}
			
			byte[] dat = new byte[recvBuf.remaining()];
			recvBuf.get(dat);
			recvBuf.clear();
			
			String errmsg = null;
			boolean feedback = (1 == (int)head.get("feedback"));
			String packid = (String)head.get("pid");
			String cmd = (String)head.get("type");
			switch (cmd) {
			case SYSCMD_FEEDBACK:
				boolean ok = true;
				if (dat.length>0) {
					//接收端处理出错了,包余下部分是出错信息
					errmsg = new String(dat);
					ok = false;
				}
				FeedInfo inf = _feed.get(packid);
				if (inf != null) {
					synchronized(inf) {
						inf.ok = ok;
						inf.msg = errmsg;
						inf.notify();
					}
				}
				return;
			default:
				RecvPack recvpack = new RecvPack(packid, cmd, client, dat, feedback);
				_recvLst.add(recvpack);

				//这里直接发反馈包，速度快，但业务端不能控制重发
				if (recvpack.isFeedback()) {
					send(recvpack.getId(), SYSCMD_FEEDBACK, recvpack.getAddress(), null == errmsg ? null : errmsg.getBytes(), null);
					//ByteBuffer pack = doPack(recvpack.getId(), SYSCMD_FEEDBACK, null == errmsg ? null : errmsg.getBytes(), 0);
					//_channel.send(pack, recvpack.getAddress());
				}
			}
		}
		
		/**
		 * 功能描述:发送数据处理过程
		 * <pre></pre> 
		 * @return  1/0 是否发送了数据
		 */
		private int doSend() {
			SendPack task = _sendLst.poll();
			if (null == task) return 0;
			
			try {
				_channel.send(task.data, task.addr);
			} catch (IOException e) {
				e.printStackTrace();
			}
			return 1;
		}
		
		/**
		 * 功能描述:  线程主体 
		 * @see java.lang.Runnable#run()
		 */
		public void socketThread() {
			try {
				int sz = _core._channel.getOption(StandardSocketOptions.SO_SNDBUF);
				System.out.println("sendbuf size="+sz);
			} catch (IOException e) {
				e.printStackTrace();
			}

			int cnt;
			try {
				Iterator<SelectionKey> iter;
				ByteBuffer recvBuf = ByteBuffer.allocateDirect(MAXUDPPACKAGESIZE);
				Selector rselector = Selector.open();
				Selector wselector = Selector.open();
				_channel.register(rselector, SelectionKey.OP_READ);
				_channel.register(wselector, SelectionKey.OP_WRITE);
				Set<SelectionKey> selKeys;
				while (_loop) {
					if (!_sendLst.isEmpty()) {
						wselector.select(1);
						selKeys = wselector.selectedKeys();
						cnt = selKeys.size();
						selKeys.clear();
						if (cnt>0) {
							doSend();
						}
					}
					
					rselector.select(1);
					selKeys = rselector.selectedKeys();
					iter = selKeys.iterator();
					while (iter.hasNext()) {
						SelectionKey key = iter.next();
						iter.remove();

						if (key.isReadable()) {
							doRecv(key, recvBuf);
						}
					}
				}
				wselector.close();
				rselector.close();
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				try {
					_channel.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}

		public void bussThread() {
			while (_loop) {
				RecvPack recvpack = _recvLst.poll();
				if (null == recvpack) {
					try { Thread.sleep(1); } catch (Exception e) { break; }
				} else {
					String errmsg = null;
					try {
						Object ret = _recvobj.apply(recvpack);
						if (ret instanceof Map) {
							@SuppressWarnings("unchecked")
							Object rawmsg = ((Map<String, Object>)ret).get("errmsg");
							if (rawmsg != null) errmsg = String.valueOf(rawmsg);
						} else if (ret instanceof String) {
							errmsg = ret.toString();
						} else if (null != ret) {
							errmsg = "不支持的返回类型";
						}
					} catch (Exception e) {
						errmsg = e.toString();
						e.printStackTrace();
					}

					/* 这里发反馈包 速度会慢一些 好处是业务端可以控制重发
					if (recvpack.isFeedback()) {
						send(recvpack.getId(), SYSCMD_FEEDBACK, recvpack.getAddress(), null == errmsg ? null : errmsg.getBytes(), null);
						//ByteBuffer pack = doPack(recvpack.getId(), SYSCMD_FEEDBACK, null == errmsg ? null : errmsg.getBytes(), 0);
						//_channel.send(pack, recvpack.getAddress());
					}*/
				}
			}
		}
		
		public void start() {
			_loop = true;
			threadA[0].start();
			threadA[1].start();
		}
		
		public void stop() {
			_loop = false;
			threadA[0] = null;
			threadA[1] = null;
		}
	}
	
	/**
	 * 功能描述:监听初始化
	 * <pre></pre> 
	 * @param localport 本地端口
	 * @param recvobj   监听回调函数
	 */
	public void listen(int localport, Function<RecvPack, Object> recvobj) {
		if (_core != null) {
			_core.stop();
		}
		_core = new Core(localport, recvobj);
		//new Thread(_core).start();
		_core.start();
	}
	
	public void stop() {
		if (_core != null) {
			_core.stop();
		}
	}
	
	/**
	 * 功能描述:数据打包
	 * <pre></pre> 
	 * @param id      数据包id，唯一且不重复
	 * @param type    数据包类型
	 * @param content 数据包数据
	 * @param needfeedback 是否需要反馈（是否需要接收端发送接受成功信息）
	 * @return  打好包的数据包对象
	 */
	private ByteBuffer doPack(String id, String type, byte[] content, int needfeedback) {
		int datlen = content == null ? 0 : content.length;
		HashMap<String, Object> head = new HashMap<String, Object>(3);
		head.put("pid", id);
		head.put("type", type);
		head.put("size", datlen);
		head.put("feedback", needfeedback);

		CRC32 crc = new CRC32();
		byte[] headstr = SerializeUtil.writeObject(head);
		int packlen = 4 //packsize int
			+ 8 //CRC long
			+ 4 //ver int
			+ 4 //head size int
			+ headstr.length + datlen;
		crc.update(PACKVER);
		crc.update(headstr.length);
		crc.update(headstr);
		if (datlen>0) crc.update(content);
		
		ByteBuffer buf = ByteBuffer.allocate(MAGICPREFIX.length + packlen);
		buf.put(MAGICPREFIX);
		buf.putInt(packlen);
		buf.putLong(crc.getValue());
		buf.putInt(PACKVER);
		buf.putInt(headstr.length);
		buf.put(headstr);
		if (datlen>0) buf.put(content);
		buf.flip();
		return buf;
	}

	/**
	 * 功能描述:解包
	 * <pre></pre> 
	 * @param buf 数据包对象
	 * @return    数据包头结构
	 */
	@SuppressWarnings("unchecked")
	private Map<String, Object> readHead(ByteBuffer buf) {
		byte[] tmp = new byte[MAGICPREFIX.length];
		buf.get(tmp, 0, MAGICPREFIX.length);
		if (!Arrays.equals(MAGICPREFIX, tmp))
			return null;
		
		int packlen = buf.getInt();
		long crcval = buf.getLong();
		int packver = buf.getInt();
		int headsize = buf.getInt();
		int datlen = packlen - 4 - 8 - 4 - 4 - headsize;
		byte[] headstr = new byte[headsize];
		buf.get(headstr, 0, headsize);
		
		CRC32 crc = new CRC32();
		crc.update(packver);
		crc.update(headsize);
		crc.update(headstr);
		if (datlen>0) {
			if (datlen != buf.remaining())
				return null;
			
			buf.mark();
			byte[] dat = new byte[datlen];
			buf.get(dat, 0, dat.length);
			buf.reset();

			crc.update(dat);
		}
		if (crc.getValue() != crcval) {
			return null;
		}
		
		Object head = SerializeUtil.readObject(headstr);
		if (head instanceof Map) {
			return (Map<String, Object>)head;
		} else {
			return null;
		}
		/*JSON序列化
		int headsize = buf.getInt();
		byte[] headstr = new byte[headsize];
		buf.get(headstr, 0, headsize);
		return JSONObject.fromObject(new String(headstr));
		*/
	}
	
	/**
	 * 功能描述:从byte数组中读取java对象
	 * <pre></pre> 
	 * @param dat byte数组
	 * @return    java对象，必须是java内置对象，不能是自定义的对象
	 */
	public Object getObject(byte[] dat) {
		return SerializeUtil.readObject(dat);
	}

	/*public void reportError(RecvPack pack, String errmsg) {
		if (pack.feedback) {
			send(packid, SYSCMD_FEEDBACK, pack.getAddress(), null == errmsg ? null : errmsg.getBytes(), null);
			//ByteBuffer pack = doPack(packid, SYSCMD_FEEDBACK, null == errmsg ? null : errmsg.getBytes(), 0);
			//_channel.send(pack, client);
		}
	}*/
	
	/**
	 * 功能描述:发送API函数1，同步执行
	 * <pre></pre> 
	 * @param type    发送类型
	 * @param addrstr 发送地址
	 * @param port    发送端口
	 * @param content 发送数据，类型为Object
	 * @param timeout 超时时间，如为0表示不需要反馈包，即不关心是否被成功接收
	 * @return  null表示没有错误，否则是错误信息
	 */
	public String send(String type, String addrstr, int port, Object obj, int timeout) {
		return send(UUID.randomUUID().toString(), type, addrstr, port, null == obj ? null : SerializeUtil.writeObject(obj), timeout);
	}
	public String send(String type, String addrstr, int port, byte[] content, int timeout) {
		return send(UUID.randomUUID().toString(), type, addrstr, port, content, timeout);
	}

	/**
	 * 功能描述:发送API函数2，同步执行
	 * <pre></pre> 
	 * @param type    发送类型
	 * @param addrstr 发送地址
	 * @param port    发送端口
	 * @param content 发送数据，类型为byte[]
	 * @param timeout 超时时间，如为0表示不需要反馈包，即不关心是否被成功接收
	 * @return  null表示没有错误，否则是错误信息
	 */
	public String send(String id, String type, String addrstr, int port, byte[] content, int timeout) {
		if (timeout>0) {
			boolean ok = true;
			FeedInfo inf = new FeedInfo(false, null);
			for (int i=0; i<RETRYTIMES; ++i) {
				_feed.put(id, inf);
				_core.send(id, type, new InetSocketAddress(addrstr, port), content, inf);
				synchronized(inf) {
					try {
						long t = System.currentTimeMillis();
						inf.wait(timeout);
						System.out.println(System.currentTimeMillis()-t);
						ok = inf.ok;
					} catch (InterruptedException e) {
						e.printStackTrace();
						ok = false;
					}
				}
				_feed.remove(id);
				if (ok) break;
			}
			return ok ? null : (inf.msg == null ? "" : inf.msg);
		} else {
			//DatagramSocket socket = _core._channel.socket();
			_core.send(id, type, new InetSocketAddress(addrstr, port), content, null);
			return null;
		}
	}
}
